December 18, 2014

Why?

Why parallel?

  • Use your whole computer for embarassingly parallel tasks
  • Large gain in efficiency for a small amount of effort

Why parallel?

The parallel package merges multicore and snow

  • Ships with recent versions of R
  • Provides drop-in replacements for most of their functionality, with integrated handling of random-number generation
  • By default, uses multicore functionality on Unix-like systems and snow functionality on Windows
  • (We can also use the snow functionality to execute on a cluster for both systems)

Getting started

Parallel backend

Make the desired number of cores available to R:

require(doParallel)

# Determine number of CPU cores in your machine
nCores = detectCores()

# Create cluster with desired number of cores
cl = makeCluster(nCores)

# Register cluster
registerDoParallel(cl)

Parallel apply

  • foreach parallelizes for loops
  • parallel does the same for apply functions

Parallel apply

Consider this simple example:

input = 1:100
input.list = as.list(input)

testFun = function(i){
  mu = mean(runif(1e+06, 0, i))
  return(mu)
}

(There is something wrong here… we'll come back to this.)

Parallel apply

How would we do this with foreach?

system.time(
  mu.foreach <- foreach(i=1:100,
                     .combine = "c") %dopar% {
                          testFun(i)
                        }
)
##    user  system elapsed 
##   0.104   0.011   3.311

Parallel apply

Now let's try it with sapply and parSapply

system.time(
  sapply(input, testFun)
  )
##    user  system elapsed 
##   5.535   0.205   5.972
system.time(
  parSapply(cl, input, testFun)
  )
##    user  system elapsed 
##   0.004   0.000   2.929

Parallel apply

Now let's try it with lapply and parLapply

system.time(
  lapply(input.list, testFun)
  )
##    user  system elapsed 
##   5.352   0.231   6.127
system.time(
  parLapply(cl, input.list, testFun)
  )
##    user  system elapsed 
##   0.003   0.000   3.343

Parallel apply

Now let's try it with mclapply

# not available on Windows
system.time(
  mclapply(input.list, testFun, mc.cores=nCores)
  )
##    user  system elapsed 
##   5.577   0.708   3.611

pvec

We can also parallelize vector map functions using pvec:

require(fields)
d = runif(5e+06, 0.1, 10)
system.time(Matern(d))
##    user  system elapsed 
##   3.700   0.207   4.396
system.time(pvec(d, Matern, mc.cores=nCores))
##    user  system elapsed 
##   4.091   0.585   2.640

Random number generation

The issue

  • Need to be careful when parallelizing a process which generates (psuedo-) random numbers
  • Want the different processes to run independent (and reproducible) random-number streams

clusterSetRNGStream

Let's go back and fix our first example:

input = 1:100
testFun = function(i){ mean(runif(1e+06, 0, i)) }
clusterSetRNGStream(cl, iseed=0)
res1 = parSapply(cl, input, testFun)

clusterSetRNGStream(cl, iseed=0)
res2 = parSapply(cl, input, testFun)
identical(res1, res2)
## [1] TRUE

Example: bootstrapping

Bootstrapping

  • An embarassingly parallel task
  • Resamples the data – uses random number generation

Bootstrapping

  • Predict number of arrivals at 5pm on a weekday for the NOHO station
  • Create a 95% bootstrap confidence interval for this prediction
locations

Bootstrapping

run1 = function(...){
  require(boot); require(splines)
  load(url("http://www.stat.colostate.edu/~scharfh/CSP_parallel/data/arrivals_subset.RData"))
  bikePred = function(data, indices){
    d = data[indices,]
    big.glm <- glm(arrivals ~
                     bs(hour, degree = 4)*weekend
                   + bs(hour, degree = 4)*as.factor(id),
                   data = d, family = "poisson")
    mynewdat = data.frame(weekend=FALSE, id=293, hour=17)
    return(predict(object=big.glm, newdata=mynewdat, type="response"))
  }
  boot(data=arrivals.sub, statistic=bikePred, R=250)
}

Bootstrapping

system.time(
  bike.boot <- do.call(c, lapply(seq_len(nCores), run1))
  )
##    user  system elapsed 
## 174.906  13.798 230.409
clusterSetRNGStream(cl, iseed=123)
system.time(
  bike.boot2 <- do.call(c, parLapply(cl, seq_len(nCores), run1))
  )
##    user  system elapsed 
##   0.008   0.002 113.577

Bootstrapping

histogram

Bootstrapping

boot.ci(bike.boot2, type="perc")
## BOOTSTRAP CONFIDENCE INTERVAL CALCULATIONS
## Based on 1000 bootstrap replicates
## 
## CALL : 
## boot.ci(boot.out = bike.boot2, type = "perc")
## 
## Intervals : 
## Level     Percentile     
## 95%   (21.57, 25.13 )  
## Calculations and Intervals on Original Scale

Since the boot package has built-in parallel support, we could also simply run the following:

nBoot = nCores*250
set.seed(123, kind="L'Ecuyer")
system.time(
  bike.boot3 <- boot(data=arrivals.sub, statistic=bikePred, R=nBoot, 
                    parallel="multicore", ncpus=4)
)
##    user  system elapsed 
## 186.661   9.374  72.778

Stop cluster

It's a good idea to stop your cluster when you are done using it.

stopCluster(cl)

References